Kotlin StateFlow 搜索功能的实践 DB + NetWork
前言
在之前的文章中分析了如何在 MVVM 架构中使用 Kotlin Flow,以及 Kotlin Flow 为我们解决了以下问题:
LiveData 是一个生命周期感知组件,最好在 View 和 ViewModel 层中使用它,如果在 Repositories 或者 DataSource 中使用会有几个问题
它不支持线程切换,其次不支持背压,也就是在一段时间内发送数据的速度 > 接受数据的速度,LiveData 无法正确的处理这些请求 使用 LiveData 的最大问题是所有数据转换都将在主线程上完成 RxJava 虽然支持线程切换和背压,但是 RxJava 那么多傻傻分不清楚的操作符,实际上在项目中常用的可能只有几个例如
Observable
、Flowable
、Single
等等,如果我们不去了解背后的原理,造成内存泄露是很正常的事,大家可以从 StackOverflow 上查看一下,有很多因为 RxJava 造成内存泄露的例子RxJava 入门的门槛很高,学习过的朋友们,我相信能够体会到从入门到放弃是什么感觉
解决回调地狱的问题
而相对于以上的不足,Flow 有以下优点:
Flow 支持线程切换、背压 Flow 入门的门槛很低,没有那么多傻傻分不清楚的操作符 简单的数据转换与操作符,如 map 等等 Flow 是对 Kotlin 协程的扩展,让我们可以像运行同步代码一样运行异步代码,使得代码更加简洁,提高了代码的可读性 易于做单元测试
而这篇文章主要来分析一下 Kotlin StateFlow 搜索功能的实践,主要包含以下几个方面的内容:
Kotlin Flow 是什么?以及如何使用? 如何区分末端操作符还是中间操作符? Kotlin Channel 是什么?以及如何使用? Kotlin Channel 都有那几种类型? BroadcastChannels
是什么?以及如何在项目中使用?StateFlow
是什么?以及如何在项目中使用?Kotlin 常用操作符 debounce
、filter
、flatMapLatest
、distinctUntilChanged
解析?
之前有很多朋友跟我反馈,如何使用 Flow 实现搜索功能,所以我在 PokemonGo 项目中增加了两种搜索场景,分别演示 BroadcastChannels
和 StateFlow
的用法
。
使用 ConflatedBroadcastChannel 实现 DB 搜索 使用 StateFlow 实现 NetWork 搜索
在分析这两种实现方式之前,需要先了解几个基本概念, Flow 和 Channel 是什么,以及常用的操作符 debounce
、filter
、flatMapLatest
、 distinctUntilChanged
等等的使用,Flow 和 Channel 是一个比较大的概念,后面我会花好几篇文章来分析它们,本文只会概述它们之间的区别。
PokemonGo 项目地址:
https://github.com/hi-dhl/PokemonGo
Kotlin Flow 是什么
先来看看 Kotlin 官方文档是如何介绍 Flow
将上面这段话,简单的总结一下:
Flow 是非阻塞的,以挂起的方式执行,只有遇到末端操作符,才会触发所有操作的执行 所有操作都在相同的代码块内顺序执行 发射出来的值都是顺序执行的,只有在某一时刻结束(遇到 末端操作符 或者出现异常) map
,filter
,take
,zip
等等是中间操作符,collect
,collectLatest
,single
,reduce
,toList
等等末端操作符中间操作符构建了一个待执行的调用链,如下图所示:
不阻塞,以挂起的方式执行 :也就是协程作用域被挂起, 当前线程中协程作用域之外的代码不会阻塞
接下来我们来看一段示例:
suspend fun printValue() = flow<Int> {
for (index in 1..10) {
emit(index)
}
}.map { it -> it * it } // map, filter, take, zip 等等是中间操作符
.filter { it -> it > 5 }
.toList() // 只有遇到末端操作符 collect, collectLatest,single, reduce, toList 等等才会触发所有操作的执行
遇到中间操作符,并不会执行任何操作,也不会挂起函数本身,这些操作符构建了一个待执行的调用链 末端操作符是可挂起函数,遇到末端操作符会触发所有操作的执行
如何区分末端操作符还是中间操作符
区分末端操作符还是中间操作符,可以按照是否是挂起函数来区分,我个人觉得按照挂起函数来区分,方便去记忆上面提到的 Flow 的几个特点,当然也可以按照其他方式来区分,我们一起来分析一下源码。
// 中间操作符是 Flow 的扩展函数,它们最后都是通过 emit 来发射数据
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
// 末端操作符是一个挂起函数
// 末端操作符无论是 collectLatest,single, reduce, toList 最后都是调用 collect
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T> = toCollection(destination)
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C {
collect { value ->
destination.add(value)
}
return destination
}
中间操作符是 Flow 的扩展函数,它们最后都是通过 emit
来发射数据末端操作符是一个挂起函数 末端操作符无论是 collectLatest
,single
,reduce
,toList
最后都是调用collect
Kotlin Channel 是什么
来看看 Kotlin 官方文档是如何介绍 Channel
将上面这段话,简单的总结一下:
Channel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信 Channel 实现了 SendChannel 和 ReceiveChannel 接口,所以既可以发送数据又可以接受数据 Channel 和 Java 中的 BlockingQueue 类似,不同之处在于 BlockingQueue 是阻塞的,而 Channel 是挂起的 发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通过缓冲区进行同步的,如下图所示:
通过发送方 (SendChannel) 将数据发送到缓冲区 通过接收方 (ReceiveChannel) 从缓冲区获取数据 发送方 (SendChannel) 和 接收方 (ReceiveChannel) 之间有一个通道,也就是缓冲区 缓冲区的作用帮我们同步发送方 (SendChannel) 和 接收方 (ReceiveChannel) 发送和接受的数据,也就意味着多个协程可以向同一个 channel 发送数据, 一个 channel 的数据也可以被多个协程接收
我们来实现一个简易的消息发送和接受的例子:
val channel = Channel<Int>()
// 接受消息
suspend fun receiveEvent() {
coroutineScope {
while (!channel.isClosedForReceive) {
// receive()方法异步获取元素,如果缓冲区是空,receive() 调用者将被挂起,直到一个新值被发送到缓冲区
// receive() 是一个挂起函数,用于同步发送方和接收方的一种机制
channel.receive()
// poll()方法同步获取一个元素,如果缓冲区是空的,则返回null
// channel.poll()
}
}
}
// 发送消息
suspend fun postEvent() {
coroutineScope {
if (!channel.isClosedForSend) {
(1..10).forEach {
// 如果缓冲区没有满,则立即添加元素,
// 如果缓冲区满了调用者会被挂起
// send() 是一个挂起函数,用于同步发送方和接收方的一种机制
channel.send(it)
// offer():如果缓冲区存在并且没有满立即向缓冲区添加一个元素
// 如果添加成功会返回true, 失败会返回 false
// channel.offer(it)
}
}
}
}
正如你所看到的 发送 和 接受 都有两个方法,分别来分析一下他们的区别。
send() 和 offer() 的区别:
send(element: E)
:如果缓冲区没有满,则立即添加元素, 如果缓冲区满了调用者会被挂起,send()
方法是一个挂起函数,用于同步发送方和接收方的一种机制offer(element: E): Boolean
:如果缓冲区存在并且没有满立即向缓冲区添加一个元素,添加成功会返回 true, 失败会返回 false
receive() 和 poll() 的区别:
receive(): E
:异步获取元素,如果缓冲区是空时调用者会被挂起,直到一个新值被发送到缓冲区,receive()
方法是一个挂起函数,用于同步发送方和接收方的一种机制poll(): E?
:用于同步获取一个元素,如果缓冲区是空的,则返回 null
Flow 与 Channel 的区别:
Flow :中间操作符 ( map
,filter
等等) 会构建了一个待执行的调用链,只有遇到末端操作符 (collect
,toList
等等) 才会触发所有操作的执行,所以 Flow 也被称为冷数据流Channel :发送方 (SendChannel) 发送数据,并不依赖于接受方(ReceiveChannel),所以 Channel 也被称为冷数据流
Channel 的不同类型
Channel 对应着有四种不同的类型:
RendezvousChannel
:这是默认的类型,大小为 0 的缓冲区,只有当send()
方法和receive()
方法都调用的时候,元素才会从发送方传输到接收方,否则将会被挂起LinkedListChannel
:会创建一个容量无限的缓冲区 (受限于内存的大小) ,send()
方法远不会挂起,offer()
方法始终返回 trueConflatedChannel
:最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,send()
方法永远不会挂起,offer()
方法始终返回 trueUnlimitedChannel
:会创建一个固定容量的数组缓冲区,send()
方法仅在缓冲区满时挂起,receive()
方法仅在缓冲区为空时挂起
创建四种不同类型 channel 的方式:
val rendezvousChannel = Channel<Int>()
val bufferedChannel = Channel<Int>(30)
val conflatedChannel = Channel<Int>(Channel.Factory.CONFLATED)
val unlimitedChannel = Channel<Int>(Channel.Factory.UNLIMITED)
BroadcastChannels 是什么
来看看 Kotlin 官方文档是如何介绍 BroadcastChannels
BroadcastChannels 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信 BroadcastChannels 实现了 SendChannel 接口,所以只可以发送数据 BroadcastChannels 提供了 openSubscription
方法,会返回一个新的 ReceiveChannel,可以从缓冲区获取数据通过 BroadcastChannels 发送的数据,所有接收方 (ReceiveChannel) 都会收到,如下图所示
BroadcastChannels 是一个接口,而它的子类有 ConflatedBroadcastChannel、ArrayBroadcastChannel,这里主要介绍一下 ConflatedBroadcastChannel,ConflatedBroadcastChannel 重写了 openSubscription
方法。
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber(this)
...... // 省略很多无关的代码
return subscriber
}
openSubscription
方法返回一个 ReceiveChannel 作为接受者在 openSubscription
方法内,创建了一个 Subscriber 的实例
Subscriber 其实是 ConflatedBroadcastChannel 的内部类,它实现了 ReceiveChannel 接口。
private class Subscriber<E>(
private val broadcastChannel: ConflatedBroadcastChannel<E>
) : ConflatedChannel<E>(), ReceiveChannel<E>
正如你所见 Subscriber 继承 ConflatedChannel 同时实现了 ReceiveChannel 接口,而 ConflatedChannel 在上文介绍过了,最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,所以 ConflatedBroadcastChannel 适合用来实现搜索相关的功能,因为用户只对最后一次搜索结果感兴趣。
注意: StateFlow 将会取代 ConflatedBroadcastChannel 下文有介绍
使用 ConflatedBroadcastChannel 实现 DB 搜索
我在 PokemonGo 项目中增加了两种搜索场景,分别通过 BroadcastChannels
和 StateFlow
来实现,通过 ConflatedBroadcastChannel 实现 DB 搜索,只需要两步
1.在 Activity 中监听 ConflatedBroadcastChannel 的变化src/main/java/com/hi/dhl/pokemon/ui/main/MainActivity.kt
// searchView 是一个 AppCompatEditText,当然你可以使用 androidx.appcompat.widget.SearchView,或者其他
searchView.addTextChangedListener {
val result = it.toString()
// 调用 queryParamterForDb 方法过滤用户的输入,并查询数据库
mViewModel.queryParamterForDb(result)
}
// 监听查询结果
mViewModel.searchResultForDb.observe(this, Observer {
mPokemonAdapter.submitData(lifecycle, it)
})
接受用户输入的数据,并调用 queryParamterForDb
方法过滤用户的输入,然后查询数据库通过 searchResultForDb.observe
方法监听查询结果
2. 在 MainViewModel 中实现 queryParamterForDb 方法src/main/java/com/hi/dhl/pokemon/ui/main/MainViewModel.kt
// 根据关键词搜索
fun queryParamterForDb(paramter: String) = mChanncel.offer(paramter)
// 使用 ConflatedBroadcastChannel 进行搜索
val searchResultForDb = mChanncel.asFlow()
// 避免在单位时间内,快输入造成大量的请求
.debounce(200)
// 避免重复的搜索请求。假设正在搜索 dhl,用户删除了 l 然后输入 l。最后的结果还是 dhl。它就不会再执行搜索查询 dhl
// distinctUntilChanged 对于 StateFlow 任何实例是没有效果的
.distinctUntilChanged()
.flatMapLatest { search -> // 只显示最后一次搜索的结果,忽略之前的请求
pokemonRepository.fetchPokemonByParameter(search).cachedIn(viewModelScope)
}
.catch { throwable ->
// 异常捕获
}.asLiveData()
通过
mChanncel.offer
发送数据通过
mChanncel.asFlow()
方法,将 Channel 转换为 Flow 并调用debounce
、distinctUntilChanged
、flatMapLatest
过掉用户的输入数据,这些操作符在后文会详细分析最后查询数据库,返回结果,项目中使用的是通过 Paging3 查询本地数据库,关于如何实现可以查看另外一篇文章
Jetpack 成员 Paging3 数据实践以及源码分析(一):
https://juejin.im/post/5ee998e8e51d4573d65df02b
重点: 在 Kotlin coroutines library (1.3.6) 版本中增加了一个新类 StateFlow,它的设计和 ConflatedBroadcastChannel 相同,将来计划完全取代 ConflatedBroadcastChannel
StateFlow 是什么
在前面的内容提到了很多次 StateFlow,那么 StateFlow 是什么,它与 Flows 和 Channels 有什么关系呢,来看看 Kotlin 官方文档是如何介绍 StateFlow
将上面这段话,简单的总结一下:
StateFlow 实现了 Flow 接口,它仅仅表示一种可读的状态,它的值是不变的,用于外部调用
public interface StateFlow<out T> : Flow<T> {
public val value: T // val 关键字表示不可变的
}StateFlow 提供了一个可变的版本 MutableStateFlow,它的值是可变的,用于内部调用
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T // var 表示可变的
}StateFlow 与 Flow 的不同之处在于,StateFlow 仅仅表示一种状态,不依赖于特定的上下文,而 Flow 操作执行是在 CoroutineScope 内的,换句话说 StateFlow 不需要在协程的作用域内,它也可以执行
刚才我们提到 StateFlow 的出现是为了取代 ConflatedBroadcastChannel,那么它与 ConflatedBroadcastChannel 有什么不同之处:
StateFlow 实现更加简单,不需要实现所有 Channel API,而 ConflatedBroadcastChannel 在其内部封装了 ConflatedChannel 和 BroadcastChannels
StateFlow 内部有个变量 value,无论任何时候都可以安全的访问
StateFlow 实现读写分离,StateFlow 用来读而 MutableStateFlow 用来写
StateFlow 内部使用
Any.equals
来比较新值与旧值,和 distinctUntilChanged 方式相同,所以在 StateFlow 上应用 distinctUntilChanged 是没有效果的StateFlow 源码:
if (oldState == newState) return // 如果值没有改变,不会做任何事
distinctUntilChanged 源码
public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =
distinctUntilChangedBy(keySelector = keySelector, areEquivalent = { old, new -> old == new })
使用 StateFlow 实现 NetWork 搜索
StateFlow 和 ConflatedBroadcastChannel 一样,实现搜索功能只需要两步
1.在 Activity 中监听 ConflatedBroadcastChannel 的变化src/main/java/com/hi/dhl/pokemon/ui/main/MainActivity.kt
// searchView 是一个 AppCompatEditText,当然你可以使用 androidx.appcompat.widget.SearchView 或者其他
searchView.addTextChangedListener {
val result = it.toString()
// 调用 queryParamterForNetWork 方法过滤用户的输入,并查询网络
mViewModel.queryParamterForNetWork(result)
}
mViewModel.searchResultMockNetWork.observe(this, Observer {
// 网络搜索回调监听
})
接受用户输入的数据,并调用 queryParamterForNetWork 方法过滤用户的输入,通过网络查询关键字 通过 searchResultMockNetWork.observe
方法监听查询结果
2. 在 MainViewModel 中实现 queryParamterForNetWork 方法src/main/java/com/hi/dhl/pokemon/ui/main/MainViewModel.kt
// 根据关键词搜索
fun queryParamterForNetWork(paramter: String) {
_stateFlow.value = paramter
}
// 因为没有合适的搜索接口,在这里模拟进行网络搜索
val searchResultMockNetWork =
// 避免在单位时间内,快输入造成大量的请求
stateFlow.debounce(200)
.filter { result ->
if (result.isEmpty()) { // 过滤掉空字符串等等无效输入
return@filter false
} else {
return@filter true
}
}
.flatMapLatest { // 只显示最后一次搜索的结果,忽略之前的请求
// 网络请求,这里替换自己的实现即可
}
.catch { throwable ->
// 异常捕获
}
.asLiveData()
通过 _stateFlow.value
更新数据调用 debounce
、filter
、flatMapLatest
等等操作符过滤掉无效的请求
常用操作符解析
在 PokemonGo 项目中使用 debounce
、filter
、flatMapLatest
、 distinctUntilChanged
等等操作符,一起来详细的分析一下这些操作符的含义,以及如何使用。
debounce
debounce
也叫做防抖动函数,当用户在很短的时间内输入 "d","dh","dhl",但是用户可能只对 "dhl" 的搜索结果感兴趣,因此我们必须舍弃 "d","dh" 过滤掉不需要的请求,针对于这个情况,我们可以使用 debounce
函数,在指定时间内出现多个字符串,debounce
始终只会发出最后一个字符串,我们来看个例子。
val result = flow {
emit("h")
emit("i")
emit("d")
delay(90)
emit("dh")
emit("dhl")
}.debounce(200).toList()
println(result) // 最后输出:dhl
filter
filter
操作符用于过滤不需要的字符串,在 PokemonGo 项目中只过滤了空字符串,我们来看个例子。
val result = flow {
emit("h")
emit("i")
emit("d")
delay(90)
emit("dh")
emit("dhl")
}.filter { result ->
if (!result.equals("dhl")) {
return@filter false
} else {
return@filter true
}
}.toList()
println(result) // 最后输出:dhl
flatMapLatest
flatMapLatest
避免向用户展示不需要的结果,只提供最后一个搜索查询(最新)的结果,例如,正在查询 "dh",然后用户输入 "dhl", 这个时候用户对 "dh" 的结果不感兴趣,可能只对 "dhl" 的结果感兴趣,这个时候可以使用 flatMapLatest
,我们来看个例子。
flow {
emit("dh")
emit("dhl")
}.flatMapLatest { value ->
flow<String> {
delay(100)
println("collected $value") // 最后输出 collected dhl
}
}.collect()
注意: flatMapLatest
在 Kotlin coroutines library (1.3.20) 以下版本使用会出现以下错误。
IllegalStateException crash: call to 'resume' before 'invoke' with coroutine
Kotlin 团队在 Kotlin coroutines library (1.3.20) 以上修复了这个问题,如果出现这个问题,将版本升级到 1.3.20 以上即可issues 地址:
https://github.com/Kotlin/kotlinx.coroutines/issues/864
DistinctUntilChanged
distinctUntilChanged
操作符用来过滤掉重复的请求,只有当前值与最后一个值不同时才将其发出,我们来看个例子。
val result = flow {
emit("d")
emit("d")
emit("d")
emit("d")
emit("dhl")
emit("dhl")
emit("dhl")
emit("dhl")
}.distinctUntilChanged().toList()
println(result) // 输出 [d, dhl]
StateFlow 内部已经实现了类似于 distinctUntilChanged
操作符的功能,因此 distinctUntilChanged 应用在 StateFlow 上是没有效果的
我们一起来分析 distinctUntilChanged
操作符源码是如何实现的
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> =
when (this) {
is StateFlow<*> -> this
else -> distinctUntilChangedBy { it }
}
distinctUntilChanged
是 Flow 的扩展函数如果当前对象是 StateFlow,直接返回调用者本身 如果不是 StateFlow 就会调用 distinctUntilChangedBy
方法
public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =
distinctUntilChangedBy(keySelector = keySelector, areEquivalent = { old, new -> old == new })
最后会调用 areEquivalent
方法进行比较,会过滤掉所有相同值的
全文到这里就结束了,效果图如下所示
文章中提到的 PokemonGo(神奇宝贝) 是基于 Jetpack + MVVM + Data Mapper + Repository + Paging3 + App Startup + Hilt + Kotlin Flow + Motionlayout + Coil 等等技术综合实战项目。
PokemonGo(神奇宝贝)
https://github.com/hi-dhl/PokemonGo
参考文献
kotlinx.coroutines.channels kotlinx.coroutines.flow Implementing Search Filter using Kotlin Going deep on Flows & Channels Implement Instant Search Using Kotlin Flow Operators
最后推荐我一直在更新维护的项目和网站:
「为互联网人而设计,国内国外名站导航」涵括新闻、体育、生活、娱乐、设计、产品、运营、前端开发、Android 开发等等网址
https://site.51git.cn最新的 AndroidX Jetpack 相关组件的实战项目 以及 原理分析的文章
https://github.com/hi-dhl/AndroidX-Jetpack-PracticeLeetCode 算法题解涵盖:数组、栈、队列、字符串、链表、树,查找算法、搜索算法、位运算、排序等等,每道题目都会用 Java 和 kotlin 去实现
https://github.com/hi-dhl/Leetcode-Solutions-with-Java-And-Kotlin最新 Android 10 源码分析系列文章
https://github.com/hi-dhl/Leetcode-Solutions-with-Java-And-Kotlin一系列精选国外的技术文章,每篇文章都会有译者思考部分,对原文的更加深入的分析
https://github.com/hi-dhl/Technical-Article-Translation
长按二维码关注我